{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "ef164c16",
   "metadata": {},
   "source": [
    "# Scalable online XGBoost inference with Ray Serve\n",
    "\n",
    "<div align=\"left\">\n",
    "<a target=\"_blank\" href=\"https://console.anyscale.com/\"><img src=\"https://img.shields.io/badge/🚀 Run_on-Anyscale-9hf\"></a>&nbsp;\n",
    "<a href=\"https://github.com/anyscale/e2e-xgboost\" role=\"button\"><img src=\"https://img.shields.io/static/v1?label=&amp;message=View%20On%20GitHub&amp;color=586069&amp;logo=github&amp;labelColor=2f363d\"></a>&nbsp;\n",
    "</div>\n",
    "\n",
    "This tutorial launches an online service that:\n",
    "- deploys trained XGBoost model artifacts to generate predictions\n",
    "- autoscales based on real-time incoming traffic\n",
    "- covers observability and debugging around the service\n",
    "\n",
    "Note that this notebook requires that you run the [Distributed training of an XGBoost model](./01-Distributed_Training.ipynb) tutorial to generate the pre-trained model artifacts that this tutorial fetches."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "459a66c4",
   "metadata": {},
   "source": [
    "[Ray Serve](https://docs.ray.io/en/latest/serve/index.html) is a highly scalable and flexible model serving library for building online inference APIs. You can:\n",
    "- Wrap models and business logic as separate [serve deployments](https://docs.ray.io/en/latest/serve/key-concepts.html#deployment) and [connect](https://docs.ray.io/en/latest/serve/model_composition.html) them together (pipeline, ensemble, etc.)\n",
    "- Avoid one large service that's network and compute bounded and an inefficient use of resources\n",
    "- Utilize fractional heterogeneous [resources](https://docs.ray.io/en/latest/serve/resource-allocation.html), which **isn't possible** with SageMaker, Vertex, KServe, etc., and horizontally scale, with `num_replicas`\n",
    "- [Autoscale](https://docs.ray.io/en/latest/serve/autoscaling-guide.html) up and down based on traffic\n",
    "- Integrate with [FastAPI and HTTP](https://docs.ray.io/en/latest/serve/http-guide.html)\n",
    "- Set up a [gRPC service](https://docs.ray.io/en/latest/serve/advanced-guides/grpc-guide.html#set-up-a-grpc-service) to build distributed systems and microservices\n",
    "- Enable [dynamic batching](https://docs.ray.io/en/latest/serve/advanced-guides/dyn-req-batch.html) based on batch size, time, etc.\n",
    "- Access a suite of [utilities for serving LLMs](https://docs.ray.io/en/latest/serve/llm/serving-llms.html) that are inference-engine agnostic and have batteries-included support for LLM-specific features such as multi-LoRA support\n",
    "\n",
    "<img src=\"https://github.com/anyscale/e2e-xgboost/blob/main/images/ray_serve.png?raw=true\" width=600>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "6f5493d9",
   "metadata": {},
   "outputs": [],
   "source": [
    "%load_ext autoreload\n",
    "%autoreload all"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "2d16364b",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Enable loading of the dist_xgboost module.\n",
    "import os\n",
    "import sys\n",
    "\n",
    "sys.path.append(os.path.abspath(\"..\"))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "e34bfa8e",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Enable Ray Train v2.\n",
    "os.environ[\"RAY_TRAIN_V2_ENABLED\"] = \"1\"\n",
    "# Now it's safe to import from ray.train."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "f01d1fca",
   "metadata": {},
   "outputs": [],
   "source": [
    "import ray\n",
    "import dist_xgboost\n",
    "\n",
    "# Initialize Ray with the dist_xgboost package.\n",
    "ray.init(runtime_env={\"py_modules\": [dist_xgboost]})"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "27a5b2ba",
   "metadata": {},
   "source": [
    "## Loading the model\n",
    "\n",
    "Next, load the pre-trained preprocessor and XGBoost model from the MLflow registry as demonstrated in the validation notebook."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "bf58fa92",
   "metadata": {},
   "source": [
    "## Creating a Ray Serve deployment\n",
    "\n",
    "Next, define the Ray Serve endpoint. Use a reusable class to avoid reloading the model and preprocessor for each request. The deployment supports both Pythonic and HTTP requests."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "bb59a703",
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "import xgboost\n",
    "from ray import serve\n",
    "from starlette.requests import Request\n",
    "\n",
    "from dist_xgboost.data import load_model_and_preprocessor\n",
    "\n",
    "\n",
    "@serve.deployment(num_replicas=2, max_ongoing_requests=25, ray_actor_options={\"num_cpus\": 2})\n",
    "class XGBoostModel:\n",
    "    def __init__(self):\n",
    "        self.preprocessor, self.model = load_model_and_preprocessor()\n",
    "\n",
    "    @serve.batch(max_batch_size=16, batch_wait_timeout_s=0.1)\n",
    "    async def predict_batch(self, input_data: list[dict]) -> list[float]:\n",
    "        print(f\"Batch size: {len(input_data)}\")\n",
    "        # Convert list of dictionaries to DataFrame.\n",
    "        input_df = pd.DataFrame(input_data)\n",
    "        # Preprocess the input.\n",
    "        preprocessed_batch = self.preprocessor.transform_batch(input_df)\n",
    "        # Create DMatrix for prediction.\n",
    "        dmatrix = xgboost.DMatrix(preprocessed_batch)\n",
    "        # Get predictions.\n",
    "        predictions = self.model.predict(dmatrix)\n",
    "        return predictions.tolist()\n",
    "\n",
    "    async def __call__(self, request: Request):\n",
    "        # Parse the request body as JSON.\n",
    "        input_data = await request.json()\n",
    "        return await self.predict_batch(input_data)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "a3cc6588",
   "metadata": {},
   "source": [
    "<div class=\"alert alert-block alert\"> <b>🧱 Model composition</b>\n",
    "\n",
    "Ray Serve makes it extremely easy to do [model composition](https://docs.ray.io/en/latest/serve/model_composition.html) where you can compose multiple deployments containing ML models or business logic into a single application. You can independently scale even fractional resources, and configure each of the deployments.\n",
    "\n",
    "<img src=\"https://raw.githubusercontent.com/anyscale/foundational-ray-app/refs/heads/main/images/serve_composition.png\" width=800>"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "c06e9701",
   "metadata": {},
   "source": [
    "Ensure that you don't have any existing deployments first using [`serve.shutdown()`](https://docs.ray.io/en/latest/serve/api/doc/ray.serve.shutdown.html#ray.serve.shutdown):"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "bfec0c7a",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "2025-04-16 21:35:03,819\tINFO worker.py:1660 -- Connecting to existing Ray cluster at address: 10.0.23.200:6379...\n",
      "2025-04-16 21:35:03,828\tINFO worker.py:1843 -- Connected to Ray cluster. View the dashboard at \u001b[1m\u001b[32mhttps://session-1kebpylz8tcjd34p4sv2h1f9tg.i.anyscaleuserdata.com \u001b[39m\u001b[22m\n",
      "2025-04-16 21:35:03,833\tINFO packaging.py:367 -- Pushing file package 'gcs://_ray_pkg_dbf2a602028d604b4b1f9474b353f0574c4a48ce.zip' (0.08MiB) to Ray cluster...\n",
      "2025-04-16 21:35:03,834\tINFO packaging.py:380 -- Successfully pushed file package 'gcs://_ray_pkg_dbf2a602028d604b4b1f9474b353f0574c4a48ce.zip'.\n"
     ]
    }
   ],
   "source": [
    "if \"default\" in serve.status().applications and serve.status().applications[\"default\"].status == \"RUNNING\":\n",
    "    print(\"Shutting down existing serve application\")\n",
    "    serve.shutdown()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "7b67d6ca",
   "metadata": {},
   "source": [
    "Now that you've defined the deployment, you can create a `ray.serve.Application` using the [`.bind()`](https://docs.ray.io/en/latest/serve/api/doc/ray.serve.Deployment.html#ray.serve.Deployment) method:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "ecd2bbbf",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Define the app.\n",
    "xgboost_model = XGBoostModel.bind()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "07a9892e",
   "metadata": {},
   "source": [
    "## Preparing test data\n",
    "\n",
    "Prepare some example data to test the deployment. Use a sample from the hold-out set:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "4ba658c1",
   "metadata": {},
   "outputs": [],
   "source": [
    "sample_input = {\n",
    "    \"mean radius\": 14.9,\n",
    "    \"mean texture\": 22.53,\n",
    "    \"mean perimeter\": 102.1,\n",
    "    \"mean area\": 685.0,\n",
    "    \"mean smoothness\": 0.09947,\n",
    "    \"mean compactness\": 0.2225,\n",
    "    \"mean concavity\": 0.2733,\n",
    "    \"mean concave points\": 0.09711,\n",
    "    \"mean symmetry\": 0.2041,\n",
    "    \"mean fractal dimension\": 0.06898,\n",
    "    \"radius error\": 0.253,\n",
    "    \"texture error\": 0.8749,\n",
    "    \"perimeter error\": 3.466,\n",
    "    \"area error\": 24.19,\n",
    "    \"smoothness error\": 0.006965,\n",
    "    \"compactness error\": 0.06213,\n",
    "    \"concavity error\": 0.07926,\n",
    "    \"concave points error\": 0.02234,\n",
    "    \"symmetry error\": 0.01499,\n",
    "    \"fractal dimension error\": 0.005784,\n",
    "    \"worst radius\": 16.35,\n",
    "    \"worst texture\": 27.57,\n",
    "    \"worst perimeter\": 125.4,\n",
    "    \"worst area\": 832.7,\n",
    "    \"worst smoothness\": 0.1419,\n",
    "    \"worst compactness\": 0.709,\n",
    "    \"worst concavity\": 0.9019,\n",
    "    \"worst concave points\": 0.2475,\n",
    "    \"worst symmetry\": 0.2866,\n",
    "    \"worst fractal dimension\": 0.1155,\n",
    "}\n",
    "sample_target = 0  # Ground truth label"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "75c9b08c",
   "metadata": {},
   "source": [
    "## Running the service\n",
    "\n",
    "There are two ways to run a Ray Serve service:\n",
    "\n",
    "1) **Serve API**: use the [`serve run`](https://docs.ray.io/en/latest/serve/getting_started.html#running-a-ray-serve-application) CLI command, like `serve run tutorial:xgboost_model`.\n",
    "2) **Pythonic API**: use `ray.serve`'s [`serve.run` command](https://docs.ray.io/en/latest/serve/api/doc/ray.serve.run.html#ray.serve.run), like `serve.run(xgboost_model)`.\n",
    "\n",
    "This example uses the Pythonic API:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "03964807",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "INFO 2025-04-16 21:35:08,246 serve 30790 -- Started Serve in namespace \"serve\".\n",
      "INFO 2025-04-16 21:35:13,363 serve 30790 -- Application 'xgboost-breast-cancer-classifier' is ready at http://127.0.0.1:8000/.\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\u001b[36m(ProxyActor pid=31032)\u001b[0m INFO 2025-04-16 21:35:08,167 proxy 10.0.23.200 -- Proxy starting on node dc30e171b93f61245644ba4d0147f8b27f64e9e1eaf34d1bb63c9c99 (HTTP port: 8000).\n",
      "\u001b[36m(ProxyActor pid=31032)\u001b[0m INFO 2025-04-16 21:35:08,226 proxy 10.0.23.200 -- Got updated endpoints: {}.\n",
      "\u001b[36m(ServeController pid=30973)\u001b[0m INFO 2025-04-16 21:35:08,307 controller 30973 -- Deploying new version of Deployment(name='XGBoostModel', app='xgboost-breast-cancer-classifier') (initial target replicas: 2).\n",
      "\u001b[36m(ProxyActor pid=31032)\u001b[0m INFO 2025-04-16 21:35:08,310 proxy 10.0.23.200 -- Got updated endpoints: {Deployment(name='XGBoostModel', app='xgboost-breast-cancer-classifier'): EndpointInfo(route='/', app_is_cross_language=False)}.\n",
      "\u001b[36m(ProxyActor pid=31032)\u001b[0m INFO 2025-04-16 21:35:08,323 proxy 10.0.23.200 -- Started <ray.serve._private.router.SharedRouterLongPollClient object at 0x77864005ee70>.\n",
      "\u001b[36m(ServeController pid=30973)\u001b[0m INFO 2025-04-16 21:35:08,411 controller 30973 -- Adding 2 replicas to Deployment(name='XGBoostModel', app='xgboost-breast-cancer-classifier').\n",
      "\u001b[36m(ServeController pid=30973)\u001b[0m INFO 2025-04-16 21:35:09,387 controller 30973 -- Deploying new version of Deployment(name='XGBoostModel', app='xgboost-breast-cancer-classifier') (initial target replicas: 2).\n",
      "\u001b[36m(ServeController pid=30973)\u001b[0m INFO 2025-04-16 21:35:10,337 controller 30973 -- Deploying new version of Deployment(name='XGBoostModel', app='xgboost-breast-cancer-classifier') (initial target replicas: 2).\n",
      "\u001b[36m(ServeController pid=30973)\u001b[0m INFO 2025-04-16 21:35:10,550 controller 30973 -- Deploying new version of Deployment(name='XGBoostModel', app='xgboost-breast-cancer-classifier') (initial target replicas: 2).\n",
      "\u001b[36m(ServeController pid=30973)\u001b[0m INFO 2025-04-16 21:35:11,395 controller 30973 -- Deploying new version of Deployment(name='XGBoostModel', app='xgboost-breast-cancer-classifier') (initial target replicas: 2).\n",
      "\u001b[36m(ServeController pid=30973)\u001b[0m INFO 2025-04-16 21:35:12,449 controller 30973 -- Deploying new version of Deployment(name='XGBoostModel', app='xgboost-breast-cancer-classifier') (initial target replicas: 2).\n",
      "\u001b[36m(ServeController pid=30973)\u001b[0m INFO 2025-04-16 21:35:13,402 controller 30973 -- Deploying new version of Deployment(name='XGBoostModel', app='xgboost-breast-cancer-classifier') (initial target replicas: 2).\n",
      "\u001b[36m(ServeController pid=30973)\u001b[0m INFO 2025-04-16 21:35:13,613 controller 30973 -- Deploying new version of Deployment(name='XGBoostModel', app='xgboost-breast-cancer-classifier') (initial target replicas: 2).\n"
     ]
    }
   ],
   "source": [
    "from ray.serve.handle import DeploymentHandle\n",
    "\n",
    "handle: DeploymentHandle = serve.run(xgboost_model, name=\"xgboost-breast-cancer-classifier\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "b8e2b045",
   "metadata": {},
   "source": [
    "You should see some logs indicating that the service is running locally:\n",
    "\n",
    "```bash\n",
    "INFO 2025-04-09 14:06:55,760 serve 31684 -- Started Serve in namespace \"serve\".\n",
    "INFO 2025-04-09 14:06:57,875 serve 31684 -- Application 'default' is ready at http://127.0.0.1:8000/.\n",
    "```\n",
    "\n",
    "You can also check whether it's running using [`serve.status()`](https://docs.ray.io/en/latest/serve/api/doc/ray.serve.status.html#ray.serve.status):"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "72794309",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "True"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "serve.status().applications[\"xgboost-breast-cancer-classifier\"].status == \"RUNNING\""
   ]
  },
  {
   "cell_type": "markdown",
   "id": "060b974c",
   "metadata": {},
   "source": [
    "## Querying the service\n",
    "\n",
    "### Using HTTP\n",
    "The most common way to query services is with an HTTP request. This request invokes the `__call__` method defined earlier:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "2d99974c",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Prediction: 0.0503\n",
      "Ground truth: 0\n"
     ]
    }
   ],
   "source": [
    "import requests\n",
    "\n",
    "url = \"http://127.0.0.1:8000/\"\n",
    "\n",
    "prediction = requests.post(url, json=sample_input).json()\n",
    "\n",
    "print(f\"Prediction: {prediction:.4f}\")\n",
    "print(f\"Ground truth: {sample_target}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "c6e28fdf",
   "metadata": {},
   "source": [
    "This approach works for processing an individual query, but isn't appropriate if you have many queries. Because `requests.post` is a blocking call, if you run it in a for loop you never benefit from Ray Serve's dynamic batching.\n",
    "\n",
    "Instead, you want to fire many requests concurrently using asynchronous requests and let Ray Serve buffer and batch process them. You can use this approach with `aiohttp`:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "9d20e07b",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\u001b[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4874, ip=10.0.240.129)\u001b[0m Batch size: 1\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\u001b[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4874, ip=10.0.240.129)\u001b[0m INFO 2025-04-16 21:35:13,834 xgboost-breast-cancer-classifier_XGBoostModel ep2o1d1x 0ddcd27d-d671-4365-b7e3-6e4cae856d9b -- POST / 200 117.8ms\n",
      "\u001b[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4875, ip=10.0.240.129)\u001b[0m INFO 2025-04-16 21:35:14,352 xgboost-breast-cancer-classifier_XGBoostModel cxd4bxd1 aeb83339-359a-41e2-99c4-4ab06252d0b9 -- POST / 200 94.7ms\n",
      "\u001b[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4875, ip=10.0.240.129)\u001b[0m INFO 2025-04-16 21:35:14,353 xgboost-breast-cancer-classifier_XGBoostModel cxd4bxd1 8c80adfd-2033-41d3-a718-aecbd5bcb996 -- POST / 200 93.9ms\n",
      "\u001b[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4875, ip=10.0.240.129)\u001b[0m INFO 2025-04-16 21:35:14,354 xgboost-breast-cancer-classifier_XGBoostModel cxd4bxd1 7ed45f79-c665-4a17-94f7-6d02c56ab504 -- POST / 200 93.8ms\n",
      "\u001b[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4875, ip=10.0.240.129)\u001b[0m INFO 2025-04-16 21:35:14,355 xgboost-breast-cancer-classifier_XGBoostModel cxd4bxd1 56fd016b-497a-43cc-b500-edafe878cda8 -- POST / 200 88.6ms\n",
      "\u001b[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4875, ip=10.0.240.129)\u001b[0m INFO 2025-04-16 21:35:14,356 xgboost-breast-cancer-classifier_XGBoostModel cxd4bxd1 4910e208-d042-4fcb-aba9-330400fba538 -- POST / 200 85.5ms\n",
      "\u001b[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4875, ip=10.0.240.129)\u001b[0m INFO 2025-04-16 21:35:14,356 xgboost-breast-cancer-classifier_XGBoostModel cxd4bxd1 b4999d9c-72fd-4bd2-aa9c-3c854ebe7457 -- POST / 200 84.7ms\n",
      "\u001b[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4875, ip=10.0.240.129)\u001b[0m INFO 2025-04-16 21:35:14,358 xgboost-breast-cancer-classifier_XGBoostModel cxd4bxd1 04bc7c27-ae22-427f-8bee-c9dbc48a0b82 -- POST / 200 85.3ms\n",
      "\u001b[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4875, ip=10.0.240.129)\u001b[0m INFO 2025-04-16 21:35:14,358 xgboost-breast-cancer-classifier_XGBoostModel cxd4bxd1 dcbbe5fa-d278-4568-a0fb-ea9347889990 -- POST / 200 84.3ms\n",
      "\u001b[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4875, ip=10.0.240.129)\u001b[0m INFO 2025-04-16 21:35:14,359 xgboost-breast-cancer-classifier_XGBoostModel cxd4bxd1 22683613-16a5-479a-92bc-14f07dc317aa -- POST / 200 83.3ms\n",
      "\u001b[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4875, ip=10.0.240.129)\u001b[0m INFO 2025-04-16 21:35:14,360 xgboost-breast-cancer-classifier_XGBoostModel cxd4bxd1 b773626c-8607-4572-bb87-8d8f80964de5 -- POST / 200 82.8ms\n",
      "\u001b[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4875, ip=10.0.240.129)\u001b[0m INFO 2025-04-16 21:35:14,361 xgboost-breast-cancer-classifier_XGBoostModel cxd4bxd1 bceee2b4-ff30-4866-a300-7591e0cdc598 -- POST / 200 79.2ms\n",
      "\u001b[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4875, ip=10.0.240.129)\u001b[0m INFO 2025-04-16 21:35:14,362 xgboost-breast-cancer-classifier_XGBoostModel cxd4bxd1 edaeb2f7-8de3-494d-8db0-8ebf2009acf7 -- POST / 200 74.7ms\n",
      "\u001b[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4875, ip=10.0.240.129)\u001b[0m INFO 2025-04-16 21:35:14,362 xgboost-breast-cancer-classifier_XGBoostModel cxd4bxd1 09a38fe8-47d3-4c0e-8f5e-c312cded2c35 -- POST / 200 74.6ms\n",
      "\u001b[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4875, ip=10.0.240.129)\u001b[0m INFO 2025-04-16 21:35:14,363 xgboost-breast-cancer-classifier_XGBoostModel cxd4bxd1 7f0d2f52-e59b-4f26-8931-61a1e9e4f988 -- POST / 200 72.9ms\n",
      "\u001b[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4875, ip=10.0.240.129)\u001b[0m INFO 2025-04-16 21:35:14,363 xgboost-breast-cancer-classifier_XGBoostModel cxd4bxd1 269b045d-0b42-407d-a52f-7222cafce0d6 -- POST / 200 71.5ms\n",
      "\u001b[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4875, ip=10.0.240.129)\u001b[0m INFO 2025-04-16 21:35:14,364 xgboost-breast-cancer-classifier_XGBoostModel cxd4bxd1 98b7ef19-f5a1-4ab2-a71c-a2b7f6a6c1ad -- POST / 200 71.1ms\n",
      "\u001b[36m(ServeController pid=30973)\u001b[0m INFO 2025-04-16 21:35:14,457 controller 30973 -- Deploying new version of Deployment(name='XGBoostModel', app='xgboost-breast-cancer-classifier') (initial target replicas: 2).\n",
      "\u001b[36m(ProxyActor pid=5012, ip=10.0.240.129)\u001b[0m INFO 2025-04-16 21:35:14,484 proxy 10.0.240.129 -- Proxy starting on node 9d22416ba66c129a3b66c96533eaa5455f7e882c37408b4fe7dc81f8 (HTTP port: 8000).\n"
     ]
    }
   ],
   "source": [
    "import asyncio\n",
    "\n",
    "import aiohttp\n",
    "\n",
    "\n",
    "async def fetch(session, url, data):\n",
    "    async with session.post(url, json=data) as response:\n",
    "        return await response.json()\n",
    "\n",
    "\n",
    "async def fetch_all(requests: list):\n",
    "    async with aiohttp.ClientSession() as session:\n",
    "        tasks = [fetch(session, url, input_item) for input_item in requests]\n",
    "        responses = await asyncio.gather(*tasks)\n",
    "        return responses"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "3cd89c9e",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\u001b[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4875, ip=10.0.240.129)\u001b[0m Batch size: 16\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Finished processing 100 queries. Example result: 0.05025313049554825\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\u001b[36m(ProxyActor pid=5012, ip=10.0.240.129)\u001b[0m INFO 2025-04-16 21:35:14,555 proxy 10.0.240.129 -- Got updated endpoints: {Deployment(name='XGBoostModel', app='xgboost-breast-cancer-classifier'): EndpointInfo(route='/', app_is_cross_language=False)}.\n",
      "\u001b[36m(ProxyActor pid=5012, ip=10.0.240.129)\u001b[0m INFO 2025-04-16 21:35:14,576 proxy 10.0.240.129 -- Started <ray.serve._private.router.SharedRouterLongPollClient object at 0x7835f2b9acc0>.\n",
      "\u001b[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4874, ip=10.0.240.129)\u001b[0m INFO 2025-04-16 21:35:14,619 xgboost-breast-cancer-classifier_XGBoostModel ep2o1d1x 24933cc1-07b4-4680-bb84-adcd54ff2de3 -- POST / 200 139.5ms\n",
      "\u001b[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4874, ip=10.0.240.129)\u001b[0m INFO 2025-04-16 21:35:14,620 xgboost-breast-cancer-classifier_XGBoostModel ep2o1d1x 15167894-ceac-4464-bbb6-0556c8299d8a -- POST / 200 138.3ms\n",
      "\u001b[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4874, ip=10.0.240.129)\u001b[0m INFO 2025-04-16 21:35:14,621 xgboost-breast-cancer-classifier_XGBoostModel ep2o1d1x e4bb73d9-6b5b-4cd0-8dc0-5bbe5329c29e -- POST / 200 138.6ms\n",
      "\u001b[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4874, ip=10.0.240.129)\u001b[0m INFO 2025-04-16 21:35:14,621 xgboost-breast-cancer-classifier_XGBoostModel ep2o1d1x 004be5f3-9ce7-4708-8579-31da77926491 -- POST / 200 94.1ms\n",
      "\u001b[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4874, ip=10.0.240.129)\u001b[0m INFO 2025-04-16 21:35:14,621 xgboost-breast-cancer-classifier_XGBoostModel ep2o1d1x 233fc1bb-6486-4704-bf03-8599176e539c -- POST / 200 92.7ms\n",
      "\u001b[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4874, ip=10.0.240.129)\u001b[0m INFO 2025-04-16 21:35:14,621 xgboost-breast-cancer-classifier_XGBoostModel ep2o1d1x cd417685-cad4-4c9d-ab51-fcd33babe57c -- POST / 200 88.5ms\n",
      "\u001b[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4874, ip=10.0.240.129)\u001b[0m INFO 2025-04-16 21:35:14,622 xgboost-breast-cancer-classifier_XGBoostModel ep2o1d1x 0ea1c55a-6722-4cb6-a9ab-9e0ffa156ef4 -- POST / 200 84.6ms\n",
      "\u001b[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4874, ip=10.0.240.129)\u001b[0m INFO 2025-04-16 21:35:14,622 xgboost-breast-cancer-classifier_XGBoostModel ep2o1d1x 3315400d-9213-46ac-9abd-baa576c73107 -- POST / 200 77.9ms\n",
      "\u001b[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4874, ip=10.0.240.129)\u001b[0m INFO 2025-04-16 21:35:14,622 xgboost-breast-cancer-classifier_XGBoostModel ep2o1d1x 25054e1f-e3e7-4106-910b-f6ba94f111be -- POST / 200 76.9ms\n",
      "\u001b[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4874, ip=10.0.240.129)\u001b[0m INFO 2025-04-16 21:35:14,623 xgboost-breast-cancer-classifier_XGBoostModel ep2o1d1x a0dbd826-c595-455f-8869-7c567c0dfac2 -- POST / 200 75.6ms\n",
      "\u001b[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4874, ip=10.0.240.129)\u001b[0m INFO 2025-04-16 21:35:14,623 xgboost-breast-cancer-classifier_XGBoostModel ep2o1d1x 136060ac-9705-49a5-b743-dc29164a3eee -- POST / 200 75.4ms\n"
     ]
    }
   ],
   "source": [
    "sample_input_list = [sample_input] * 100\n",
    "\n",
    "# Notebook is already running an asyncio event loop in background, so use `await`.\n",
    "# In other cases, you would use `asyncio.run(fetch_all(sample_input_list))`.\n",
    "responses = await fetch_all(sample_input_list)\n",
    "print(f\"Finished processing {len(responses)} queries. Example result: {responses[0]}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "ff978c09",
   "metadata": {},
   "source": [
    "### Using Python\n",
    "\n",
    "For a more direct Pythonic way to query the model, you can use the deployment handle:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "96becd4b",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "INFO 2025-04-16 21:35:14,803 serve 30790 -- Started <ray.serve._private.router.SharedRouterLongPollClient object at 0x7156ffcf6d80>.\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\u001b[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4874, ip=10.0.240.129)\u001b[0m Batch size: 11\n",
      "\u001b[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4875, ip=10.0.240.129)\u001b[0m Batch size: 1\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "0.05025313049554825\n"
     ]
    }
   ],
   "source": [
    "response = await handle.predict_batch.remote(sample_input)\n",
    "print(response)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "a358e256",
   "metadata": {},
   "source": [
    "This approach is useful if you need to interact with the service from a different process in the same Ray Cluster. If you need to regenerate the serve handle, you can use [`serve.get_deployment_handle`](https://docs.ray.io/en/latest/serve/api/doc/ray.serve.get_deployment_handle.html):\n",
    "\n",
    "`handle = serve.get_deployment_handle(\"XGBoostModel\", \"xgboost-breast-cancer-classifier\")`"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "15bff8bc",
   "metadata": {},
   "source": [
    "<div class=\"alert alert-block alert\"> <b>🔎 Observability for services</b>\n",
    "\n",
    "The Ray dashboard automatically captures observability for Ray Serve applications in the [Serve view](https://docs.ray.io/en/latest/ray-observability/getting-started.html#serve-view). You can view the service [deployments and their replicas](https://docs.ray.io/en/latest/serve/key-concepts.html#serve-key-concepts-deployment) and time-series metrics about the service's health.\n",
    "\n",
    "<img src=\"https://raw.githubusercontent.com/anyscale/e2e-xgboost/refs/heads/main/images/serve_dashboard.png\" width=800>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "a1f3b467",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Shutdown service.\n",
    "serve.shutdown()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "792348c5",
   "metadata": {},
   "source": [
    "<div class=\"alert alert-block alert\"> <b>Anyscale Services</b>\n",
    "\n",
    "[Anyscale Services](https://docs.anyscale.com/platform/services/) offers a fault tolerant, scalable and optimized way to serve Ray Serve applications. See the [API ref](https://docs.anyscale.com/reference/service-api/) for more details. You can:\n",
    "- [rollout and update](https://docs.anyscale.com/platform/services/update-a-service) services with canary deployment and zero-downtime upgrades.\n",
    "- [monitor](https://docs.anyscale.com/platform/services/monitoring) services through a dedicated service page, unified log viewer, tracing, set up alerts, etc.\n",
    "- scale a service with `num_replicas=auto` and utilize replica compaction to consolidate nodes that are fractionally utilized.\n",
    "- have [head node fault tolerance](https://docs.anyscale.com/platform/services/production-best-practices#head-node-ft). OSS Ray recovers from failed workers and replicas but not head node crashes.\n",
    "- serving [multiple applications](https://docs.anyscale.com/platform/services/multi-app) in a single Service\n",
    "\n",
    "<img src=\"https://raw.githubusercontent.com/anyscale/e2e-xgboost/refs/heads/main/images/canary.png\" width=1000>\n",
    "\n",
    "[RayTurbo Serve](https://docs.anyscale.com/rayturbo/rayturbo-serve) on Anyscale has more capabilities on top of Ray Serve:\n",
    "- **fast autoscaling and model loading** to get services up and running even faster with [5x improvements](https://www.anyscale.com/blog/autoscale-large-ai-models-faster) even for LLMs\n",
    "- 54% **higher QPS** and up-to 3x **streaming tokens per second** for high traffic serving use-cases with no proxy bottlenecks\n",
    "- **replica compaction** into fewer nodes where possible to reduce resource fragmentation and improve hardware utilization\n",
    "- **zero-downtime** [incremental rollouts](https://docs.anyscale.com/platform/services/update-a-service/#resource-constrained-updates) so the service is never interrupted\n",
    "- [**different environments**](https://docs.anyscale.com/platform/services/multi-app/#multiple-applications-in-different-containers) for each service in a multi-serve application\n",
    "- **multi availability-zone** aware scheduling of Ray Serve replicas to provide higher redundancy to availability zone failures\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "78b0fdc4",
   "metadata": {},
   "source": [
    "**Note**: \n",
    "- This example uses a `containerfile` to define dependencies, but you could easily use a pre-built image as well.\n",
    "- You can specify the compute as a [compute config](https://docs.anyscale.com/configuration/compute-configuration/) or inline in a [Service config](https://docs.anyscale.com/reference/service-api/) file.\n",
    "- When you don't specify compute and you launch from a workspace, the default is the compute configuration of the workspace.\n",
    "\n",
    "\n",
    "```bash\n",
    "# Production online service.\n",
    "anyscale service deploy dist_xgboost.serve:xgboost_model --name=xgboost-breast_cancer_all_features \\\n",
    "  --containerfile=\"${WORKING_DIR}/containerfile\" \\\n",
    "  --working-dir=\"${WORKING_DIR}\" \\\n",
    "  --exclude=\"\"\n",
    "```\n",
    "\n",
    "\n",
    "Note that for this command to succeed, you need to configure MLflow to store the artifacts in storage that's readable across clusters. Anyscale offers a variety of storage options that work out of the box, such as a [default storage bucket](https://docs.anyscale.com/configuration/storage/#anyscale-default-storage-bucket), as well as [automatically mounted network storage](https://docs.anyscale.com/configuration/storage/) shared at the cluster, user, and cloud levels. You could also set up your own network mounts or storage buckets.\n",
    "\n",
    "Running this command starts a service in production. In the process, Anyscale creates and saves a container image to enable fast starting this service in the future. The link to the endpoint and the bearer token appears in the logs. After the service is running remotely, you need to use the bearer token to query it. Here's how you would modify the preceding `requests` code to use this token:\n",
    "\n",
    "```python\n",
    "# Service specific config. Replace with your own values from the preceding logs.\n",
    "base_url = \"https://xgboost-breast-cancer-all-features-jgz99.cld-kvedzwag2qa8i5bj.s.anyscaleuserdata.com\"\n",
    "token = \"tXhmYYY7qMbrb1ToO9_J3n5_kD7ym7Nirs8djtip7P0\"\n",
    "\n",
    "# Requests config.\n",
    "path = \"/\"\n",
    "full_url = f\"{base_url}{path}\"\n",
    "headers = {\"Authorization\": f\"Bearer {token}\"}\n",
    "\n",
    "prediction = requests.post(url, json=sample_input, headers=headers).json()\n",
    "```\n",
    "\n",
    "Don't forget to stop the service once it's no longer needed:\n",
    "\n",
    "```bash\n",
    "anyscale service terminate --name e2e-xgboost\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "8b6a519d",
   "metadata": {},
   "source": [
    "<div class=\"alert alert-block alert\"> <b>CI/CD</b>\n",
    "\n",
    "While Anyscale [Jobs](https://docs.anyscale.com/platform/jobs/) and [Services](https://docs.anyscale.com/platform/services/) are useful atomic concepts that help you productionize workloads, they're also convenient for nodes in a larger ML DAG or [CI/CD workflow](https://docs.anyscale.com/ci-cd/). You can chain Jobs together, store results, and then serve the application with those artifacts. From there, you can trigger updates to the service and retrigger the Jobs based on events, time, etc. While you can use the Anyscale CLI to integrate with any orchestration platform, Anyscale does support some purpose-built integrations like [Airflow](https://docs.anyscale.com/ci-cd/apache-airflow/) and [Prefect](https://github.com/anyscale/prefect-anyscale). \n",
    "\n",
    "<img src=\"https://raw.githubusercontent.com/anyscale/e2e-xgboost/refs/heads/main/images/cicd.png\" width=700>\n",
    "\n"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": ".venv",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.12.9"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
